-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-54971] Add WITH SCHEMA EVOLUTION syntax for SQL INSERT #53732
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
JIRA Issue Information=== Improvement SPARK-54971 === This comment was automatically generated by GitHub Actions |
| if (!i.overwrite) { | ||
| if (isByName) { | ||
| AppendData.byName(r, query) | ||
| AppendData.byName( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not properly model the schemaEvolution flag in AppendData/ etc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szehon-ho You mean we should add a flag schemaEvolution in place of writeOptions = schemaEvolutionWriteOption in v2 Write nodes, and add the schemaEvolutionWriteOption in https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala#L61 if the schemaEvolution flag is enabled? Do you have an example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having a dedicated flag would be cleaner: mergeSchema is fairly overloaded, e.g. it can also be a read option in Parquet that means inferring a superset schema from multiple Parquet files being read.
But: dataframe operations have always been using mergeSchema to enable schema evolution in Delta and Iceberg. By reusing the mergeSchema option, we automatically get schema evolution working there.
If we introduce a new field, then until Delta/Iceberg pick it up, WITH SCHEMA EVOLUTION will essentially be ignored - not good.
I would use mergeSchema for now, we can still introduce a dedicated field later if we want to
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, thanks
|
I was thinking it can be interesting to have Spark optionally call alterTable , if the V2 data source has TableCapability.AUTOMATIC_SCHEMA_EVOLUTION (which we introduced when doing MERGE INTO schema evolution implementation in DSV2). That will ease the burden on the data sources. But it can be a future enhancement. |
| if (!i.overwrite) { | ||
| if (isByName) { | ||
| AppendData.byName(r, query) | ||
| AppendData.byName( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having a dedicated flag would be cleaner: mergeSchema is fairly overloaded, e.g. it can also be a read option in Parquet that means inferring a superset schema from multiple Parquet files being read.
But: dataframe operations have always been using mergeSchema to enable schema evolution in Delta and Iceberg. By reusing the mergeSchema option, we automatically get schema evolution working there.
If we introduce a new field, then until Delta/Iceberg pick it up, WITH SCHEMA EVOLUTION will essentially be ignored - not good.
I would use mergeSchema for now, we can still introduce a dedicated field later if we want to
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
Show resolved
Hide resolved
| } | ||
| } | ||
|
|
||
| test("SPARK-54971: INSERT WITH SCHEMA EVOLUTION is currently unsupported") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To cover the first
case InsertIntoStatement(l @ LogicalRelationWithTable(_: InsertableRelation, _),
parts, _, query, overwrite, false, _) if parts.isEmpty =>
parts, _, query, overwrite, false, _, withSchemaEvolution)
if parts.isEmpty && !withSchemaEvolution =>
| } | ||
| } | ||
|
|
||
| testPartitionedTable("SPARK-54971: INSERT WITH SCHEMA EVOLUTION is currently unsupported") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To cover the 2nd case
case i @ InsertIntoStatement(l @ LogicalRelationWithTable(t: HadoopFsRelation, table),
parts, _, query, overwrite, _, _, withSchemaEvolution)
if query.resolved && !withSchemaEvolution =>
| validatePartitionSpec(partCols, i.partitionSpec) | ||
|
|
||
| val schemaEvolutionWriteOption: Map[String, String] = | ||
| if (i.withSchemaEvolution) Map("mergeSchema" -> "true") else Map.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not add a new bool field to AppendData, like what we did for InsertIntoStatement? The MergeIntoTable also has a withSchemaEvolution flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @cloud-fan, thank you very much for your review! We also raised this point and this was our discussion on it
|
@johanl-db : A big question is where we should apply schema evolution. For MERGE INTO, @szehon-ho did a lot of work to do schema evolution within Spark, so that the behavior is controlled by Spark and all data source will have the same behavior. From this PR, it seems the data source need to implement INSERT schema evolution by its own? |
DSv1 data sources still need to do schema evolution themselves, this doesn't change with this PR. This is somewhat orthogonal to this PR that focuses on adding dedicated SQL syntax, but doesn't intend to extend schema evolution support beyond what exists today (i.e. mostly provided by DSv1 data source implementation themselves) |
|
thanks for the explanation, merging to master! |
What changes were proposed in this pull request?
Similar to the MERGE WITH SCHEMA EVOLUTION PR, this PR introduces a syntax
WITH SCHEMA EVOLUTIONto the SQLINSERTcommand. Since this syntax is not fully implemented for any table formats yet, users will receive an exception if they try to use it.When
WITH SCHEMA EVOLUTIONis specified, schema evolution-related features must be turned on for this single statement and only in this statement.In this PR, Spark is only responsible for recognizing the existence or absence of the syntax WITH SCHEMA EVOLUTION, and the recognition info is passed down from the
Analyzer. WhenWITH SCHEMA EVOLUTIONis detected, Spark sets themergeSchemawrite option totruein the respective V2 Insert Command nodes.Data sources must respect the syntax and give appropriate reactions: Turn on features that are categorised as "schema evolution" when the
WITH SCHEMA EVOLUTIONSyntax exists.Why are the changes needed?
This intuitive SQL Syntax allows the user to specify Automatic Schema Evolution for a specific
INSERToperation.Some users would like Schema Evolution for DML commands like
MERGE,INSERT,... where the schema between the table and query relations can mismatch.Does this PR introduce any user-facing change?
Yes, Introducing the SQL Syntax
WITH SCHEMA EVOLUTIONto SQLINSERT.How was this patch tested?
Added UTs.
Was this patch authored or co-authored using generative AI tooling?
No.